QuickOPC User's Guide and Reference
Installed Examples - Console - SimpleUAStreamInsightApplication

This example shows how to create an OPC Unified Architecture data event source, and query it for events carrying even data value.

The main program:

// This example shows how to create an OPC Unified Architecture data event source, and query it for events carrying even data 
// value.

using System;
using System.Diagnostics;
using System.Reactive;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using OpcLabs.EasyOpc.UA.ComplexEventProcessing;
using OpcLabs.EasyOpc.UA.Reactive;

namespace SimpleUAStreamInsightApplication
{
    class Program
    {
        static void Main()
        {
            // Create an embedded StreamInsight server
            //using (var server = Server.Create("Default"))
            using (var server = Server.Create("Instance1"))
            {
                Debug.Assert(server != null);

                // Create a local end point for the server embedded in this program
                var managementService = server.CreateManagementService();
                Debug.Assert(managementService != null);
                var host = new ServiceHost(managementService);
                host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), 
                    "http://localhost/MyStreamInsightServer");
                host.Open();

                /* The following entities will be defined and available in the server for other clients:
                 * serverApp
                 * serverSource
                 * serverSink
                 * serverProcess
                 */

                // CREATE a StreamInsight APPLICATION in the server
                var myApp = server.CreateApplication("serverApp");

                // DEFINE a simple SOURCE (returns a point event every second)
                var observable = UADataChangeNotificationObservable.Create<int>(
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer",
                    "nsu=http://test.org/UA/Data/ ;i=11017", // Data/Dynamic/UserScalar/Int32Value
                    100);
                var mySource = myApp
                    .DefineObservable(() => observable)
                    .ToPointStreamable(
                        eventArgs => PointEvent.CreateInsert(
                            DateTimeOffset.Now, 
                            (UADataChangeNotificationPayload<int>)eventArgs),
                        AdvanceTimeSettings.StrictlyIncreasingStartTime);

                // DEPLOY the source to the server for clients to use
                mySource.Deploy("serverSource");

                // Compose a QUERY over the source (return every event carrying even data value)
                var myQuery = from e in mySource
                              where e.AttributeDataPayload.Value % 2 == 0
                              select e;

                // DEFINE a simple observer SINK (writes the value to the server console)
                var mySink = myApp.DefineObserver(() => Observer.Create<UADataChangeNotificationPayload<int>>(
                    payload => Console.WriteLine("sink_Server..: {0}", payload)));

                // DEPLOY the sink to the server for clients to use
                mySink.Deploy("serverSink");

                // BIND the query to the sink and RUN it
                var binding = myQuery.Bind(mySink);
                Debug.Assert(binding != null);
                using (/*var proc = */binding.Run("serverProcess"))
                {
                    // Wait for the user stops the server
                    Console.WriteLine("----------------------------------------------------------------");
                    Console.WriteLine("MyStreamInsightServer is running, press Enter to stop the server");
                    Console.WriteLine("----------------------------------------------------------------");
                    Console.WriteLine(" ");
                    Console.ReadLine();
                }
                host.Close();
            }
        }
    }
}

 

See Also

Conceptual